package com.atguigu.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.utils.JdbcUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.*;

public class DbTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private HashMap<String, TableProcess> tableProcessHashMap;

    public DbTableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        tableProcessHashMap = new HashMap<>();

        Connection connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-220718-config?user=root&password=000000&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");
        List<TableProcess> tableProcesses = JdbcUtil.queryList(connection,
                "select * from table_process where sink_type='dwd'",
                TableProcess.class,
                true);

        for (TableProcess tableProcess : tableProcesses) {
            String key = tableProcess.getSourceTable() + "-" + tableProcess.getSourceType();
            tableProcessHashMap.put(key, tableProcess);
        }

        connection.close();
    }

    //value：{"before":null,"after":{"source_table":"base_category3","source_type":"insert","sink_table":"dim_base_category3","sink_columns":"id,name,category2_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669162876406,"snapshot":"false","db":"gmall-220623-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669162876406,"transaction":null}
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {

        JSONObject jsonObject = JSON.parseObject(value);
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        //当配置信息被删除时,需要删除状态中的配置信息
        if ("d".equals(jsonObject.getString("op"))) {
            JSONObject before = jsonObject.getJSONObject("before");
            String key = before.getString("source_table") + "-" + before.getString("source_type");
            broadcastState.remove(key);
            tableProcessHashMap.remove(key);
        } else {
            //解析数据
            TableProcess tableProcess = JSON.parseObject(jsonObject.getString("after"), TableProcess.class);

            //将数据写入状态
            String key = tableProcess.getSourceTable() + "-" + tableProcess.getSourceType();
            broadcastState.put(key, tableProcess);
        }
    }

    //Value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null}}
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

        //获取状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("table") + "-" + value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);
        TableProcess mapTableProcess = tableProcessHashMap.get(key);

        //过滤数据  行 列  bootstrap-insert bootstrap-start bootstrap-complete
        if (tableProcess != null || mapTableProcess != null) {

            if (tableProcess == null) {
                tableProcess = mapTableProcess;
            }

            filterColumns(value.getJSONObject("data"), tableProcess.getSinkColumns());

            //补充sink_table字段写出
            value.put("sink_table", tableProcess.getSinkTable()); //Kafka主题名
            out.collect(value);
        } else {
            System.out.println("没有该组合Key：" + key);
        }
    }

    //根据建表字段过滤主流数据字段
    //data:{"id":"12","tm_name":"aaaa","logo_url":"xxx"}
    //sinkColumns:id,tm_name
    private void filterColumns(JSONObject data, String sinkColumns) {

        String[] fields = sinkColumns.split(",");
        List<String> fieldList = Arrays.asList(fields);

//        Set<Map.Entry<String, Object>> entries = data.entrySet();
//        Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> entry = iterator.next();
//            if (!fieldList.contains(entry.getKey())) {
//                iterator.remove();
//            }
//        }

        Set<Map.Entry<String, Object>> entries = data.entrySet();
        entries.removeIf(entry -> !fieldList.contains(entry.getKey()));

    }
}